package org.example.util;

/**
 * kafka工具类
 */

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class KafkaUtils {
    /**
     * kafka broker集群地址
     */
    private static String BOOTSTRAP_SERVERS = "localhost:9092";
    static {
        BOOTSTRAP_SERVERS = ConfigLoader.loadProperties("kafka.properties").getProperty("bootstrap.servers");
    }

    /**
     * 静态内部类实现单例模式生成kafka生产者
     * 
     * @param producer
     * @param msgRecord
     */
    private static class ProducerHolder {
        private static final KafkaProducer<String, String> INSTANCE = createProducer();

        private static KafkaProducer<String, String> createProducer() {
            Properties properties = new Properties();
            // bootstrap servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            // key序列化器
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            // value序列化器
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

            // SASL_PLAINTEXT 公网接入
            //
            properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            // SASL 采用 Plain 方式。
            properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

            // 可靠性配置 todo
            return new KafkaProducer<>(properties);
        }
    }

    public static KafkaProducer<String, String> getProducerInstance() {
        return ProducerHolder.INSTANCE;
    }

    /**
     * 创建Kafka消费者
     * 
     * @param groupId 消费者组ID
     * @return KafkaConsumer实例
     */
    public static KafkaConsumer<String, String> createKafkaConsumer(String groupId) {
        return createKafkaConsumer(groupId, null);
    }

    /**
     * 创建Kafka消费者
     * 
     * @param groupId     消费者组ID
     * @param customProps 自定义配置属性
     * @return KafkaConsumer实例
     */
    public static KafkaConsumer<String, String> createKafkaConsumer(String groupId, Properties customProps) {
        Properties props = new Properties();

        // 基础配置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //
        // SASL_PLAINTEXT 公网接入
        //
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        // SASL 采用 Plain 方式。
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        // 默认配置
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");

        // 应用自定义配置
        if (customProps != null) {
            props.putAll(customProps);
        }

        return new KafkaConsumer<>(props);
    }

    // 同步发送消息
    public static void sendMessage(KafkaProducer<String, String> producer, ProducerRecord<String, String> msgRecord) {
        try {
            RecordMetadata metadata = producer.send(msgRecord).get();
            log.info("send message succcess,topic:{},partition:{},offset:{}", metadata.topic(), metadata.partition(),
                    metadata.offset());
        } catch (Exception e) {
            log.error("send message error,", e);
        }
    }

    // 异步发送消息
    public static void sendMessageAsync(KafkaProducer<String, String> producer,
            ProducerRecord<String, String> msgRecord) {
        producer.send(msgRecord, (metadata, exception) -> {
            if (exception != null) {
                log.error("send message error,", exception);
            } else {
                log.info("send message succcess,topic:{},partition:{},offset:{}", metadata.topic(),
                        metadata.partition(), metadata.offset());
            }
        });
    }

    /**
     * 启动消费者
     * 
     * @param topics         要订阅的主题列表
     * @param groupId        消费者组ID
     * @param messageHandler 消息处理器
     * @return GenericKafkaConsumer实例
     */
    public static GenericKafkaConsumer startConsumer(List<String> topics, String groupId,
            MessageHandler messageHandler) {
        return startConsumer(topics, groupId, messageHandler, null);
    }

    /**
     * 启动消费者
     * 
     * @param topics         要订阅的主题列表
     * @param groupId        消费者组ID
     * @param messageHandler 消息处理器
     * @param customProps    自定义配置属性
     * @return GenericKafkaConsumer实例
     */
    public static GenericKafkaConsumer startConsumer(List<String> topics, String groupId,
            MessageHandler messageHandler, Properties customProps) {
        GenericKafkaConsumer consumer = new GenericKafkaConsumer(topics, groupId, messageHandler, customProps);
        // consumer.start();
        return consumer;
    }

    public static void closeProducer() {
        try {
            ProducerHolder.INSTANCE.close();
        } catch (Exception e) {
            log.error("close producer error,", e);
        }
    }

    /**
     * 通用Kafka消费者类
     */
    public static class GenericKafkaConsumer {
        private final KafkaConsumer<String, String> consumer;
        private final MessageHandler messageHandler;
        private final AtomicBoolean running = new AtomicBoolean(false);
        private Thread consumerThread;
        private final List<String> topics;

        public GenericKafkaConsumer(List<String> topics, String groupId, MessageHandler messageHandler,
                Properties customProps) {
            this.topics = topics;
            this.messageHandler = messageHandler;
            this.consumer = createKafkaConsumer(groupId, customProps);
            this.consumer.subscribe(topics);

            log.info("创建Kafka消费者成功, topics: {}, groupId: {}", topics, groupId);
        }

        /**
         * 启动消费者
         */
        public void start() {
            if (running.compareAndSet(false, true)) {
                consumerThread = new Thread(this::consumeMessages, "kafka-consumer-thread");
                consumerThread.start();
                log.info("Kafka消费者已启动, topics: {}", topics);
            } else {
                log.warn("Kafka消费者已经在运行中");
            }
        }

        /**
         * 停止消费者
         */
        public void stop() {
            if (running.compareAndSet(true, false)) {
                log.info("正在停止Kafka消费者...");

                if (consumerThread != null) {
                    consumerThread.interrupt();
                    try {
                        consumerThread.join(5000); // 等待最多5秒
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        log.warn("等待消费者线程结束时被中断");
                    }
                }

                if (consumer != null) {
                    consumer.close();
                    log.info("Kafka消费者已关闭");
                }
            }
        }

        /**
         * 消费消息的主循环
         */
        private void consumeMessages() {
            try {
                while (running.get()) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                    for (ConsumerRecord<String, String> record : records) {
                        try {
                            messageHandler.handle(record.topic(), record.key(), record.value(),
                                    record.partition(), record.offset());
                        } catch (Exception e) {
                            messageHandler.handleError(record.topic(), record.key(), record.value(),
                                    record.partition(), record.offset(), e);
                        }
                    }
                }
            } catch (Exception e) {
                if (running.get()) {
                    log.error("消费消息时发生异常", e);
                }
            } finally {
                log.info("Kafka消费者线程已退出");
            }
        }

        /**
         * 检查消费者是否正在运行
         * 
         * @return true if running
         */
        public boolean isRunning() {
            return running.get();
        }
    }

    public static void main(String[] args) {
        KafkaProducer<String, String> producer = KafkaUtils.getProducerInstance();
        ProducerRecord<String, String> msgRecord = new ProducerRecord<>("test-topic", "Hello World,你好 中国");
        KafkaUtils.sendMessage(producer, msgRecord);
    }

}
